Workflowsで使えるBigQueryのqueryコネクタを用いてBigQueryにクエリ発行する時はtimeoutMsフィールドに注意

Workflowsで使えるBigQueryのqueryコネクタを用いてBigQueryにクエリ発行する時はtimeoutMsフィールドに注意

Clock Icon2024.12.16

概要

WorkflowsのBigQueryコネクタ(query)を用いるとWorkflowsからBigQueryにSQLを実行することができます。

Method: googleapis.bigquery.v2.jobs.query

https://cloud.google.com/workflows/docs/reference/googleapis/bigquery/v2/jobs/query

とっても便利なqueryコネクタですが、注意が必要な点があったので記事にしてみました。

注意点

queryコネクタのリファレンスには以下の通り概要が記載されています。

Runs a BigQuery SQL query synchronously and returns query results if the query completes within a specified timeout.
BigQueryのSQLクエリを同期的に実行し、指定したタイムアウト内にクエリが完了した場合にクエリ結果を返します。

つまりこのコネクタは同期的に実行されるので、SELECT文などで結果を取得してWorkflowsで扱う場合にポーリングをして待つ必要がないものとなります。
そして重要なのは指定したタイムアウトという箇所です。
queryコネクタは実行時の引数でタイムアウト時間を設定することができます。
タイムアウト時間を設定する引数はtimeoutMsというものですが、この指定を省略すると10秒でリクエストがタイムアウトします。

引数 説明
timeoutMs [Optional] How long to wait for the query to complete, in milliseconds, before the request times out and returns. Note that this is only a timeout for the request, not the query. If the query takes longer to run than the timeout value, the call returns without any results and with the 'jobComplete' flag set to false. You can call GetQueryResults() to wait for the query to complete and read the results. The default value is 10000 milliseconds (10 seconds). [オプション] リクエストがタイムアウトしてリターンするまでの、クエリが完了するまでの待ち時間をミリ秒単位で指定します。 これはリクエストのタイムアウトであり、クエリのタイムアウトではないことに注意してください。 クエリの実行にタイムアウト値よりも長い時間がかかった場合、コール結果は返されず、 'jobComplete' フラグが false に設定されます。 GetQueryResults() を呼び出して、クエリの完了を待ち、結果を読み取ることができます。 既定値は 10000 ミリ秒 (10 秒) です(DeepL翻訳)。

タイムアウトといってもクエリが終了されるわけではありません。WorkflowsがBigQueryへクエリ実行した後にBigQueryからの結果返却までの時間のタイムアウトとなります。
イメージとしては以下となります(あくまでイメージ図です)
スクリーンショット 2024-12-16 23.45.41
なのでtimeoutMsを省略したqueryコネクタを用いた実装を行った場合は、10秒以上かかるクエリの場合はBigQuery側ではSQLは実行されているが、Workflowsへ結果が返却されないということになります。この場合、実行結果をjobidでポーリングして待機するか、またはgetQueryResultsコネクタを用いて結果取得する必要があります。
https://cloud.google.com/workflows/docs/reference/googleapis/bigquery/v2/jobs/getQueryResults

実際に試してみる

準備

前提として10秒以上かかるクエリを用意する必要があります(これが一番大変でした)。
外部テーブルを用意して、Workflowsからクエリ発行します。3GBのJSONLinesのデータを作成して、COUNTを取るというクエリで大体15秒程度でした。
一応以下にダミーデータ作成のスクリプト、DDL、SQLを示します。本筋ではないので解説はしません。

  • スクリプト
import json
import random
import string
import os

# 3GBの目標サイズ(バイト単位)
TARGET_SIZE = 3 * 1024 * 1024 * 1024

# 出力ファイル名
OUTPUT_FILE = "large_file.jsonl"

def random_string(length=10):
    """ランダムな文字列を生成する"""
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def random_record():
    """ランダムなレコードを生成する"""
    return {
        "id": random.randint(1, 1000000),
        "name": random_string(10),
        "email": f"{random_string(5)}@example.com",
        "age": random.randint(18, 99),
        "address": {
            "city": random_string(8),
            "zipcode": random.randint(10000, 99999),
        },
        "is_active": random.choice([True, False]),
        "tags": [random_string(5) for _ in range(random.randint(1, 5))]
    }

def create_large_jsonlines_file():
    """JSONLinesファイルを作成する"""
    with open(OUTPUT_FILE, "w") as f:
        current_size = 0

        while current_size < TARGET_SIZE:
            record = random_record()
            json_record = json.dumps(record)
            f.write(json_record + "\n")  # 各レコードを1行に書き込む

            current_size += len(json_record.encode("utf-8")) + 1  # +1は改行文字の分

    print(f"JSONLinesファイル '{OUTPUT_FILE}' が作成されました。サイズ: {os.path.getsize(OUTPUT_FILE) / (1024 * 1024):.2f} MB")

if __name__ == "__main__":
    create_large_jsonlines_file()
  • DDL
CREATE OR REPLACE EXTERNAL TABLE tmp_nemoto.test(
  id INT64,
  name STRING,
  email STRING,
  age INT64,
  address STRUCT<
    city STRING,
    zipcode INT64
  >,
  is_active BOOL,
  tags ARRAY<STRING>
)
  OPTIONS (
  format = 'json',
  uris = ['gs://バケット名/bq_data/large_file*.jsonl']);
  • SQL
SELECT
  COUNT(*) AS table_count
FROM データセット名.test

Workflowsを実行してみる

実行するSQLファイルをCloud Storageバケットに保存しておいて、それを読み取ってSQLを実行するワークフローを用意しました。
以下はワークフローのソース全文です。

- init:
    assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - sqlBucket: "バケット名"
        - sqlFilePath: "SQLファイル名"
- readSqlFiles:
    call: googleapis.storage.v1.objects.get
    args:
        bucket: ${sqlBucket}
        object: ${text.url_encode(sqlFilePath)}
        alt: "media"
    result: sqlFileContent
- executeQuery:
    call: googleapis.bigquery.v2.jobs.query
    args:
        projectId: ${project_id}
        body:
            query: ${text.decode(sqlFileContent)}
            useLegacySql: false
            useQueryCache: false
            #timeoutMs: 20000 まずはコメントアウト
    result: queryResult
- executeQueryLog:
    call: sys.log
    args:
        text: ${queryResult}
        severity: "INFO"
- returnStep:
    return: ${queryResult}

まずtimeoutMsを設定しない状態で実行します。キャッシュされてしまうと意図したタイムアウトしなくなってしまうのでuseQueryCacheをfalse(キャッシュしない)に設定しています。

実行すると以下のレスポンスが出力されます。
スクリーンショット 2024-12-16 23.00.44

{
  "jobComplete": false,
  "jobCreationReason": {
    "code": "REQUESTED"
  },
  "jobReference": {
    "jobId": "job_8vPjkjEh-3lzGFq0gW6hGl3RwQW7",
    "location": "asia-northeast1",
    "projectId": "プロジェクトID"
  },
  "kind": "bigquery#queryResponse",
  "queryId": "job_8vPjkjEh-3lzGFq0gW6hGl3RwQW7"
}

jobCompletefalseになっているのでジョブ実行中にタイムアウトとなっていることがわかります。
BigQueryのジョブID(queryId)が出ているのでINFORMATION_SCHEMA.JOBS Viewから本当に10秒以上実行時間がかかっていたかみてみます。

SELECT
  job_id,
  state,
  TIMESTAMP_DIFF(end_time, start_time, SECOND) AS execution_time_seconds
FROM
  `region-asia-northeast1`.`INFORMATION_SCHEMA.JOBS`
WHERE job_id = 'job_8vPjkjEh-3lzGFq0gW6hGl3RwQW7';

11秒かかっていることが確認できました。
スクリーンショット 2024-12-16 23.11.25

では、今度はtimeoutMsを20000ミリ秒(=20秒)に設定して実行してみます。

- init:
    assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - sqlBucket: "バケット名"
        - sqlFilePath: "SQLファイル名"
- readSqlFiles:
    call: googleapis.storage.v1.objects.get
    args:
        bucket: ${sqlBucket}
        object: ${text.url_encode(sqlFilePath)}
        alt: "media"
    result: sqlFileContent
- executeQuery:
    call: googleapis.bigquery.v2.jobs.query
    args:
        projectId: ${project_id}
        body:
            query: ${text.decode(sqlFileContent)}
            useLegacySql: false
            useQueryCache: false
            timeoutMs: 20000 #コメントアウト解除
    result: queryResult
- executeQueryLog:
    call: sys.log
    args:
        text: ${queryResult}
        severity: "INFO"
- returnStep:
    return: ${queryResult}

実行結果を見てみます。
スクリーンショット 2024-12-16 23.19.34

実行時間は11秒ちょいで、SQLの実行は完了してクエリ結果も返ってきているように見えますね!
こちらもJSONをみてみます。

{
  "cacheHit": false,
  "jobComplete": true,
  "jobCreationReason": {
    "code": "REQUESTED"
  },
  "jobReference": {
    "jobId": "job_kXqnrVt-7mxMmB0sde_ausdLsbnJ",
    "location": "asia-northeast1",
    "projectId": "プロジェクトID"
  },
  "kind": "bigquery#queryResponse",
  "queryId": "job_kXqnrVt-7mxMmB0sde_ausdLsbnJ",
  "rows": [
    {
      "f": [
        {
          "v": "17376189"
        }
      ]
    }
  ],
  "schema": {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "table_count",
        "type": "INTEGER"
      }
    ]
  },
  "totalBytesProcessed": "3221225601",
  "totalRows": "1"
}

jobCompletetrueになっていますね。結果も期待した値(17376189)が返ってきています。
※テーブルのデータはランダムで作成しているので作成タイミングによって期待する値は異なります。

上記より、timeoutMsを設定することで10秒以上かかるクエリでもWorkflowsで結果取得することができました。

おまけ

timeoutMsに設定できる上限値は記載がありませんでしたが、BigQueryのSQLの実行時間上限の6時間(=21,600,000ミリ秒)は設定することができました。

まとめ

queryコネクタを用いてBigQueryにSQLを実行して、Workflows側に取得した結果やSQLの実行状態(失敗したかどうか)を検知したい場合はtimeoutMsを設定するようにしましょう。
逆に、そういったことが不要な場合はtimeoutMsを設定せずに(デフォルト10秒が設定されますが)実装してしまっても良いのかなと思います。
意外と見落とすことがあるかもしれない引っかかりポイントかもと思ったので記事にしてみました。

それではまた、ナマステー

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.